-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expose subgraph logs via subgraph GraphQL #6278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
688827a to
120d61b
Compare
Introduces the foundation for the log store system with: - LogStore trait for querying logs from backends - LogLevel enum with FromStr trait implementation - LogEntry and LogQuery types for structured log data - LogStoreFactory for creating backend instances - NoOpLogStore as default (disabled) implementation
Implements three log storage backends for querying logs: - FileLogStore: Streams JSON Lines files with bounded memory usage - ElasticsearchLogStore: Queries Elasticsearch indices with full-text search - LokiLogStore: Queries Grafana Loki using LogQL All backends implement the LogStore trait and support: - Filtering by log level, timestamp range, and text search - Pagination via first/skip parameters - Returning structured LogEntry objects Dependencies added: reqwest, serde_json for HTTP clients.
Implements slog drains for capturing and writing logs: - FileDrain: Writes logs to JSON Lines files (one file per subgraph) - LokiDrain: Writes logs to Grafana Loki via HTTP push API Both drains: - Capture structured log entries with metadata (module, line, column) - Format logs with timestamp, level, text, and arguments - Use efficient serialization with custom KVSerializers
Adds a configuration layer for selecting and configuring log backends: - LogStoreConfig enum with variants: Disabled, File, Elasticsearch, Loki - LogConfigProvider for loading config from environment variables and CLI args - Unified GRAPH_LOG_STORE_* environment variable naming - CLI arguments with --log-store-backend and backend-specific options - Configuration precedence: CLI args > env vars > defaults - Deprecation warnings for old config variables Supported configuration: - Backend selection (disabled, file, elasticsearch, loki) - File: directory, max size, retention days - Elasticsearch: endpoint, credentials, index, timeout - Loki: endpoint, tenant ID
Refactors LoggerFactory to use LogStoreConfig instead of elastic-only: - Replaced elastic_config with log_store_config parameter - Build ElasticLoggingConfig on-demand from LogStoreConfig::Elasticsearch - Support all log drain types (File, Loki, Elasticsearch) - Maintain backward compatibility with existing elastic configuration This enables the factory to create drains for any configured backend while preserving the existing component logger patterns.
Adds GraphQL API for querying subgraph logs: Schema types: - LogLevel enum (CRITICAL, ERROR, WARNING, INFO, DEBUG) - _Log_ type with id, timestamp, level, text, arguments, meta - _LogArgument_ type for structured key-value pairs - _LogMeta_ type for source location (module, line, column) Query field (_logs) with filters: - level: Filter by log level - from/to: Timestamp range (ISO 8601) - search: Text search in log messages - first/skip: Pagination (max 1000, skip max 10000)
Integrates _logs query into the GraphQL execution pipeline: Execution layer: - Execute _logs queries via log_store.query_logs() - Convert LogEntry results to GraphQL response objects - Handle log store errors gracefully Query parsing: - Recognize _logs as special query field - Build LogQuery from GraphQL arguments - Pass log_store to execution context Service wiring: - Create log store from configuration in launcher - Provide log store to GraphQL runner - Use NoOpLogStore in test environments This completes the read path from GraphQL query to log storage backend.
120d61b to
ee0f228
Compare
a4432ca to
384bf35
Compare
|
One thing I wonder about: should this be configured via environment variables or through |
Adds comprehensive integration test for _logs query: Test implementation: - Deploys logs-query subgraph and waits for sync - Triggers contract events to generate logs - Queries _logs field with various filters - Verifies log entries are returned correctly - Tests filtering by level and text search
- Create graph/src/log/common.rs for common log drain functionality - SimpleKVSerializer: Concatenates KV pairs to strings - VecKVSerializer: Collects KV pairs into Vec<(String, String)> - HashMapKVSerializer: Collects KV pairs into HashMap - LogMeta: Shared metadata structure (module, line, column) - LogEntryBuilder: Builder for common log entry fields - level_to_str(): Converts slog::Level to string - create_async_logger(): Consistent async logger creation - Updated FileDrain, LokiDrain, and ElasticDrain to use the log common utilities
- include _logs in the set of special fields that bypass indexing error shortcutting when subgraph failed - add integration test to ensure _log queries return logs after subgraph failed
384bf35 to
881e55a
Compare
dwerner
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm! I have a few questions/suggestions, but yolo
| /// | ||
| /// # Returns | ||
| /// The parsed u64 value, or the default if parsing fails or neither key is set | ||
| pub fn read_u64_with_fallback(logger: &Logger, new_key: &str, old_key: &str, default: u64) -> u64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: could lift the generic style of str::parse and avoid needing multiple functions, get a specialization of FromStr implementer for free.
i.e.
pub fn parse<F: FromStr>(&self) -> Result<F, F::Err> {
FromStr::from_str(self)
}
| .client | ||
| .post(&url) | ||
| .json(&query_body) | ||
| .timeout(self.timeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 for timeout.
|
|
||
| pub struct FileLogStore { | ||
| directory: PathBuf, | ||
| // TODO: Implement log rotation when file exceeds max_file_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Todos left, not sure if intended to be implemented this pass.
| } | ||
|
|
||
| /// Parse a JSON line into a LogEntry | ||
| fn parse_line(&self, line: &str) -> Option<LogEntry> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Design: idiomatically, parse methods have a Result<T, SomeErrorType> and I would expect that here but I get the practical. Worth considering a custom error type for log store? Particularly because we just ignore the line entirely it's effectively eaten if there's a parse error.
Edit: further reading later suggests there is a LogStoreError enum already. Use here?
| Ok(entries) | ||
| } | ||
|
|
||
| fn parse_log_entry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See previous comment - error enum useful here?
|
|
||
| use crate::prelude::DeploymentHash; | ||
|
|
||
| #[derive(Error, Debug)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Continuing my suggestion/question: maybe a child error enum that captures errors on retrieval/parsing?
| }, | ||
| } | ||
|
|
||
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Is there a need for a custom LogLevel impl? I feel like you'd get all of this for free if you used tracing::Level including the FromStr impls.
| } | ||
|
|
||
| /// Converts an slog Level to a string representation | ||
| pub fn level_to_str(level: Level) -> &'static str { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to last comment on custom log level type.
|
|
||
| /// Serializes an slog log level using a serde Serializer. | ||
| fn serialize_log_level<S>(level: &Level, serializer: S) -> Result<S::Ok, S::Error> | ||
| fn serialize_log_level<S>(level: &str, serializer: S) -> Result<S::Ok, S::Error> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this previously Level coming in from the slog::* log import? Related to the log level stuff above.
This PR introduces a more flexible subgraph log storage and querying system for Graph Node and enables subgraph logs to be queried through the GraphQL subgraph query API. The implementation supports multiple log storage backends (File, Elasticsearch, and Loki) with a consistent query interface exposed to users in the subgraph's GraphQL schema.
What's new
GraphQL Query API
Storage Backends
Architecture
Examples
Querying logs
{ _logs( level: ERROR search: "timeout" from: "2024-01-15T00:00:00Z" to: "2024-01-16T00:00:00Z" first: 100 ) { id timestamp level text arguments { key value } meta { module line column } } }Configuring the logs store backend
File-based (development):
Loki (production):